package com.ruyuan.dfs.client;

import com.google.protobuf.InvalidProtocolBufferException;
import com.ruyuan.dfs.client.config.FsClientConfig;
import com.ruyuan.dfs.client.exception.DfsClientException;
import com.ruyuan.dfs.client.tools.CommandLineListener;
import com.ruyuan.dfs.client.tools.OnMultiFileProgressListener;
import com.ruyuan.dfs.common.Constants;
import com.ruyuan.dfs.common.NettyPacket;
import com.ruyuan.dfs.common.enums.PacketType;
import com.ruyuan.dfs.common.exception.RequestTimeoutException;
import com.ruyuan.dfs.common.ha.BackupNodeManager;
import com.ruyuan.dfs.common.network.NetClient;
import com.ruyuan.dfs.common.network.RequestWrapper;
import com.ruyuan.dfs.common.network.file.FileTransportClient;
import com.ruyuan.dfs.common.network.file.OnProgressListener;
import com.ruyuan.dfs.common.utils.DefaultScheduler;
import com.ruyuan.dfs.common.utils.PrettyCodes;
import com.ruyuan.dfs.common.utils.StringUtils;
import com.ruyuan.dfs.model.backup.BackupNodeInfo;
import com.ruyuan.dfs.model.backup.INode;
import com.ruyuan.dfs.model.client.*;
import com.ruyuan.dfs.model.common.DataNode;
import com.ruyuan.dfs.model.namenode.ClientDataNodeInfo;
import com.ruyuan.dfs.model.namenode.ClientNameNodeInfo;
import com.ruyuan.dfs.model.namenode.ListFileRequest;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


/**
 * 文件系统客户端的实现类
 *
 * @author Sun Dasheng
 */
@Slf4j
public class FileSystemImpl implements FileSystem {

    //请求状态值
    private static final int AUTH_INIT = 0;
    private static final int AUTH_SUCCESS = 1;
    private static final int AUTH_FAIL = 2;
    //客户端设置
    private FsClientConfig fsClientConfig;
    //客户端网络通信
    private NetClient netClient;
    //线程调度任务器  调度什么的?
    private DefaultScheduler defaultScheduler;
    //可见性 认证状态
    private volatile int authStatus = AUTH_INIT;
    //NameNode  的备份 节点  为啥需要这个?
    private BackupNodeManager backupNodeManager;
    //命令行监听器  监听谁的?
    private CommandLineListener commandLineListener;

    //
    private ConcurrentHashMap<String, FileTransportClient> fileTransportClientPool = new ConcurrentHashMap<>();


    public FileSystemImpl(FsClientConfig fsClientConfig) {
        //初始化 客户端的 默认任务调度器
        this.defaultScheduler = new DefaultScheduler("FSClient-Scheduler-");
        //客户端 重试次数  指的哪方面的重试次数?
        int connectRetryTime = fsClientConfig.getConnectRetryTime() > 0 ? fsClientConfig.getConnectRetryTime() : -1;
        //客户端 网络通信 初始化
        this.netClient = new NetClient("FSClient-NameNode-" + fsClientConfig.getServer(), defaultScheduler, connectRetryTime);
        //保存起来
        this.fsClientConfig = fsClientConfig;
        //初始化 BackupNodeManager 这个备份 NameNode
        this.backupNodeManager = new BackupNodeManager(defaultScheduler);
    }

    /**
     * 设置命令行监听器
     *
     * @param commandLineListener 监听器
     */
    public void setCommandLineListener(CommandLineListener commandLineListener) {
        this.commandLineListener = commandLineListener;
    }

    /**
     * 启动本地文件client
     */
    public void start() throws Exception {
        //增加监听器(监听收到消息后的回调)
        this.netClient.addNettyPackageListener(this::onReceiveMessage);
        // 添加监听器，在断线重连的时候，自动发起认证
        this.netClient.addConnectListener(connected -> {
            if (connected) {
                //发起认证请求
                authenticate();
                //获取 backNode 消息
                fetchBackupInfo();
            }
        });
        //增加失败监听器
        this.netClient.addNetClientFailListener(() -> {
            log.info("dfs-client检测到NameNode挂了，标记NameNode已经宕机");
            backupNodeManager.markNameNodeDown();
            if (commandLineListener != null) {
                commandLineListener.onConnectFailed();
            }
        });
        //建立连接与 NameNode
        this.netClient.connect(fsClientConfig.getServer(), fsClientConfig.getPort());
        //如果连接断开了，会阻塞直到连接重新建立
        this.netClient.ensureConnected();
        log.info("和NameNode建立连接成功");
    }

    /**
     * 收到消息
     */
    private void onReceiveMessage(RequestWrapper requestWrapper) throws InvalidProtocolBufferException {
        PacketType packetType = PacketType.getEnum(requestWrapper.getRequest().getPacketType());
        if (packetType == PacketType.FETCH_BACKUP_NODE_INFO) {
            //客户端或者DataNode往NameNode上发起请求获取BackupNode信息
            handleFetchBackupNodeInfoResponse(requestWrapper);
        }
    }

    private void handleFetchBackupNodeInfoResponse(RequestWrapper requestWrapper) throws InvalidProtocolBufferException {
        if (requestWrapper.getRequest().getBody().length == 0) {
            log.info("拉取BackupNode信息为空，设置NetClient为无限重试.");
            netClient.setRetryTime(-1);
            return;
        }
        netClient.setRetryTime(3);
        BackupNodeInfo backupNodeInfo = BackupNodeInfo.parseFrom(requestWrapper.getRequest().getBody());
        //
        backupNodeManager.maybeEstablishConnect(backupNodeInfo, hostname -> {
            fsClientConfig.setServer(hostname);
            netClient.shutdown();
            log.info("检测到BackupNode升级为NameNode了，替换NameNode链接信息，并重新建立链接：[hostname={}, port={}]", fsClientConfig.getServer(), fsClientConfig.getPort());
            netClient = new NetClient("FSClient-NameNode-" + fsClientConfig.getServer(), defaultScheduler);
            try {
                //启动本地文件client
                start();
            } catch (Exception e) {
                log.error("连接失败：", e);
            }
        });
    }

    private void fetchBackupInfo() throws InterruptedException {
        //客户端或者DataNode往NameNode上发起请求获取BackupNode信息
        // 获取他信息干啥啊?
        log.info("客户端往NameNode上发起请求获取BackupNode信息");
        NettyPacket nettyPacket = NettyPacket.buildPacket(new byte[0], PacketType.FETCH_BACKUP_NODE_INFO);
        //发送消息
        netClient.send(nettyPacket);
    }

    /**
     * 发起认证请求
     * 因为这里需要发送请求并同步等待结果，onConnected 方法不能阻塞，
     * 否则会导致后面网络收发失败，所以需要新开线程处理
     */
    private void authenticate() {
        //默认调度器执行
        defaultScheduler.scheduleOnce("发起认证", () -> {
            try {
                String authInfo = fsClientConfig.getUsername() + "," + fsClientConfig.getSecret();
                AuthenticateInfoRequest req = AuthenticateInfoRequest.newBuilder().setAuthenticateInfo(authInfo).build();
                //建立网络包
                NettyPacket nettyPacket = NettyPacket.buildPacket(req.toByteArray(), PacketType.AUTHENTICATE);
                //同步发送
                NettyPacket resp = safeSendSync(nettyPacket);
                AuthenticateInfoResponse authenticateInfoResponse = AuthenticateInfoResponse.parseFrom(resp.getBody());
                authStatus = AUTH_SUCCESS;
                fsClientConfig.setUserToken(authenticateInfoResponse.getToken());
                //唤醒线程
                notifyAuthenticate();
                log.info("发起认证成功：[username={}, token={}]", fsClientConfig.getUsername(), authenticateInfoResponse.getToken());
                if (commandLineListener != null) {
                    //
                    commandLineListener.onAuthResult(true);
                }
            } catch (Exception e) {
                log.error("发起认证失败：", e);
                authStatus = AUTH_FAIL;
                //认证失败 执行关闭流程
                close();
                if (commandLineListener != null) {
                    commandLineListener.onAuthResult(false);
                }
            }
        });
    }

    /**
     * 创建目录
     */
    @Override
    public void mkdir(String path) throws Exception {
        mkdir(path, new HashMap<>(PrettyCodes.trimMapSize()));
    }

    @Override
    public void mkdir(String path, Map<String, String> attr) throws Exception {
        //验证参数
        validate(path);
        attr.put("hostname", fsClientConfig.getServer());
        attr.put("port", fsClientConfig.getPort() + "");

        //构建请求数据
        MkdirRequest request = MkdirRequest.newBuilder().setPath(path).putAllAttr(attr).build();

        //构建请求包
        NettyPacket nettyPacket = NettyPacket.buildPacket(request.toByteArray(), PacketType.MKDIR);
        // 同步发送给 nameNode
        safeSendSync(nettyPacket);
        log.info("创建文件夹成功：[filename={}]", path);
    }

    @Override
    public void put(String filename, File file) throws Exception {
        put(filename, file, -1, new HashMap<>(PrettyCodes.trimMapSize()));
    }

    @Override
    public void put(String filename, File file, int replicaNum) throws Exception {
        put(filename, file, replicaNum, new HashMap<>(PrettyCodes.trimMapSize()));
    }

    @Override
    public void put(String filename, File file, int numOfReplica, Map<String, String> attr) throws Exception {
        //     filename     服务器文件路径
        //     file             本地文件
        //    attr            文件属性
        //   numOfReplica 文件副本数量
        //   listener 监听器
        put(filename, file, numOfReplica, attr, null);
    }

    /**
     * <pre>
     *      1. 在NameNode中创建一个文件，接着NameNode返回一个DataNode机器列表
     *      2. 上传对应的文件到DataNode
     *      逻辑：
     *      1.构建文件基本属性元数据包 上传
     *      2.获得DataNodeList 循环遍历发送
     *      3.分配堆外内存 来将文件分块循环同步发送 每次发送完毕都会有监听器回调
     *      4.构建文件结束包  上传
     * </pre>
     */
    @Override
    public void put(String filename, File file, int replicaNum, Map<String, String> attr, OnProgressListener listener) throws Exception {
        //验证
        validate(filename);
        //文件副本数量
        if (replicaNum > Constants.MAX_REPLICA_NUM) {
            throw new DfsClientException("不合法的副本数量：" + replicaNum);
        }
        //遍历文件基本属性值
        for (String key : Constants.KEYS_ATTR_SET) {
            if (attr.containsKey(key)) {
                log.info("文件属性包含关键属性：[key={}]", key);
            }
        }
        //副本数量大于0 那么对应的起码得有 几个 DataNode节点吧
        if (replicaNum > 0) {
            attr.put(Constants.ATTR_REPLICA_NUM, String.valueOf(replicaNum));
        }
        attr.put("hostname", fsClientConfig.getServer());
        attr.put("port", fsClientConfig.getPort() + "");
        //构建创建文件请求
        CreateFileRequest request = CreateFileRequest.newBuilder().setFilename(filename).setFileSize(file.length()).putAllAttr(attr).build();
        //构建网络包 客户端往NameNode发送创建文件请求
        NettyPacket nettyPacket = NettyPacket.buildPacket(request.toByteArray(), PacketType.CREATE_FILE);
        //请求一： 客户端同步往 NameNode 发送创建文件请求
        NettyPacket resp = safeSendSync(nettyPacket);
        CreateFileResponse response = CreateFileResponse.parseFrom(resp.getBody());
        //根据 NameNode 返回的 dataNode 列表来 逐一发送文件数据块
        OnMultiFileProgressListener onMultiFileProgressListener = new OnMultiFileProgressListener(listener, response.getDataNodesList().size());
        //遍历文件存储服务器
        for (int i = 0; i < response.getDataNodesList().size(); i++) {
            //DataNode 节点
            DataNode dataNodes = response.getDataNodes(i);
            String hostname = dataNodes.getHostname();
            int port = dataNodes.getNioPort();
            FileTransportClient fileTransportClient = null;
            String clientPoolKey = hostname + "-" + port;
            FileTransportClient transportClient = fileTransportClientPool.get(clientPoolKey);
            if (transportClient == null) {
                //建立 DataNode 网络连接
                NetClient netClient = new NetClient("FSClient-DataNode-" + hostname, defaultScheduler);
                //支持文件上传、下载的客户端
                fileTransportClient = new FileTransportClient(netClient);
                //正式提出连接
                netClient.connect(hostname, port);
                //确保连接或者重试
                netClient.ensureConnected();
                netClient.addConnectListener((connected) -> {
                    if (!connected) {
                        //通道关闭的时候把pool也删除了
                        fileTransportClientPool.remove(clientPoolKey);
                    }
                });
            }

            log.info("开始上传文件到：[node={}:{}, filename={}]", hostname, port, filename);
            //强制发送
            fileTransportClient.sendFile(response.getRealFileName(), file.getAbsolutePath(), onMultiFileProgressListener, true);
            //每次发送完毕后 执行关闭线程池
            //fileTransportClient.shutdown();
            log.info("完成上传文件到：[node={}:{}, filename={}]", hostname, port, filename);
        }
        /*
         * 文件上传是上传到DataNode节点，客户端上传到DataNode之后，DataNode再上报给NameNode节点中间有一个时间差
         * 为了达到强一致性，保证文件上传后，立马是可以读取文件的，需要等待NameNode收到DataNode上报的信息，才认为是上传成功的
         * 但是这样一来会降低上传文件的吞吐量。 因为会占用NameNode一个线程池的线程在哪里hang住等待3秒，
         * 有可能让DataNode上报的请求在队列里面一直等待，最终出现超时错误。这里有两种方案可以选择：
         *
         * 1、 客户端可以配置让NameNode确认是否等待，如果开启确认等待，则吞吐量会下降，但是保证强一致性。如果不开启确认等待，则吞吐量比较高，
         *     但是一致性不能保证，就是说上传完毕后有可能立即读文件读不到
         *
         * 2、 在NameNode那边等待的过程，不要直接在线程里面等待，而是建立一个任务Task，保存在集合中，后台起一个线程，就无限循环的去判断
         *     这个Task是否完成，如果完成才写回响应。这种方式可以保证强一致性，并且不会阻塞线程池中的线程。
         *
         * 目前我们先采用第一种方式实现，第二种后面可以考虑优化实现。
         */
        //请求二：客户端往DataNode上传完文件之后，再发请求往NameNode确认
        NettyPacket confirmRequest = NettyPacket.buildPacket(request.toByteArray(), PacketType.CREATE_FILE_CONFIRM);
        confirmRequest.setTimeoutInMs(-1);
        confirmRequest.setAck(fsClientConfig.getAck());
        safeSendSync(confirmRequest);
    }

    @Override
    public void get(String filename, String absolutePath) throws Exception {
        get(filename, absolutePath, null);
    }

    @Override
    public void get(String filename, String absolutePath, OnProgressListener listener) throws Exception {
        //验证文件名字
        validate(filename);
        //构建获取文件请求包
        GetDataNodeForFileRequest request = GetDataNodeForFileRequest.newBuilder().setFilename(filename).build();
        //构建网络请求包
        //客户端往NameNode发送获取文件所在DataNode的请求
        NettyPacket nettyPacket = NettyPacket.buildPacket(request.toByteArray(), PacketType.GET_DATA_NODE_FOR_FILE);
        //同步发送请求
        NettyPacket resp = safeSendSync(nettyPacket);
        //获取 DataNode 节点信息
        GetDataNodeForFileResponse response = GetDataNodeForFileResponse.parseFrom(resp.getBody());
        DataNode dataNode = response.getDataNode();
        //建立网络通信
        NetClient netClient = new NetClient("FSClient-DataNode-" + dataNode.getHostname(), defaultScheduler);
        //文件支持器
        FileTransportClient fileTransportClient = new FileTransportClient(netClient);
        netClient.connect(dataNode.getHostname(), dataNode.getNioPort());
        netClient.ensureConnected();
        //开始发送请求下载文件的 网络包
        fileTransportClient.readFile(response.getRealFileName(), absolutePath, new OnProgressListener() {
            @Override
            public void onProgress(long total, long current, float progress, int currentReadBytes) {
                if (listener != null) {
                    listener.onProgress(total, current, progress, currentReadBytes);
                }
            }

            @Override
            public void onCompleted() {
                if (listener != null) {
                    listener.onCompleted();
                }
                fileTransportClient.shutdown();
            }
        });
    }

    @Override
    public void remove(String filename) throws Exception {
        validate(filename);
        RemoveFileRequest request = RemoveFileRequest.newBuilder().setFilename(filename).build();
        //客户端删除文件
        NettyPacket nettyPacket = NettyPacket.buildPacket(request.toByteArray(), PacketType.REMOVE_FILE);
        safeSendSync(nettyPacket);
    }

    @Override
    public Map<String, String> readAttr(String filename) throws Exception {
        validate(filename);
        ReadAttrRequest request = ReadAttrRequest.newBuilder().setFilename(filename).build();
        //客户端读取文件属性
        NettyPacket nettyPacket = NettyPacket.buildPacket(request.toByteArray(), PacketType.READ_ATTR);
        //同步发送
        NettyPacket resp = safeSendSync(nettyPacket);
        //解析文件属性值
        ReadAttrResponse response = ReadAttrResponse.parseFrom(resp.getBody());
        return response.getAttrMap();
    }

    @Override
    public void close() {
        this.defaultScheduler.shutdown();
        this.netClient.shutdown();
        fileTransportClientPool.forEach((k, v) -> {
            v.shutdown();
        });
    }

    @Override
    public List<FsFile> listFile(String path) throws Exception {
        validate(path);
        ListFileRequest listFileRequest = ListFileRequest.newBuilder().setPath(path).build();
        NettyPacket nettyPacket = NettyPacket.buildPacket(listFileRequest.toByteArray(), PacketType.CLIENT_LIST_FILES);
        NettyPacket responsePackage = safeSendSync(nettyPacket);
        INode node = INode.parseFrom(responsePackage.getBody());
        return FsFile.parse(node);
    }

    @Override
    public ClientNameNodeInfo nameNodeInfo() throws Exception {
        NettyPacket nettyPacket = NettyPacket.buildPacket(new byte[0], PacketType.CLIENT_READ_NAME_NODE_INFO);
        NettyPacket responseNettyPacket = safeSendSync(nettyPacket);
        return ClientNameNodeInfo.parseFrom(responseNettyPacket.getBody());
    }

    @Override
    public ClientDataNodeInfo dataNodeInfo() throws Exception {
        NettyPacket nettyPacket = NettyPacket.buildPacket(new byte[0], PacketType.CLIENT_READ_DATA_NODE_INFO);
        NettyPacket responseNettyPacket = safeSendSync(nettyPacket);
        return ClientDataNodeInfo.parseFrom(responseNettyPacket.getBody());
    }

    @Override
    public ReadStorageInfoResponse readStorageInfo(String filename) throws Exception {
        validate(filename);
        ReadStorageInfoRequest request = ReadStorageInfoRequest.newBuilder().setFilename(filename).build();
        NettyPacket nettyPacket = NettyPacket.buildPacket(request.toByteArray(), PacketType.CLIENT_READ_STORAGE_INFO);
        NettyPacket responseNettyPacket = safeSendSync(nettyPacket);
        return ReadStorageInfoResponse.parseFrom(responseNettyPacket.getBody());
    }

    @Override
    public PreCalculateResponse preCalculatePath(String path) throws Exception {
        validate(path);
        PreCalculateRequest request = PreCalculateRequest.newBuilder().setPath(path).build();
        NettyPacket nettyPacket = NettyPacket.buildPacket(request.toByteArray(), PacketType.CLIENT_PRE_CALCULATE);
        NettyPacket responseNettyPacket = safeSendSync(nettyPacket);
        return PreCalculateResponse.parseFrom(responseNettyPacket.getBody());
    }

    @Override
    public GetAllFilenameResponse getAllFilenameByPath(String path) throws Exception {
        validate(path);
        GetAllFilenameRequest request = GetAllFilenameRequest.newBuilder().setPath(path).build();
        NettyPacket nettyPacket = NettyPacket.buildPacket(request.toByteArray(), PacketType.CLIENT_GET_ALL_FILENAME);
        nettyPacket.setSupportChunked(true);
        NettyPacket responsePackage = safeSendSync(nettyPacket);
        return GetAllFilenameResponse.parseFrom(responsePackage.getBody());
    }

    /**
     * 确保返回的结果不是无权限
     */
    private NettyPacket safeSendSync(NettyPacket nettyPacket) throws DfsClientException, InterruptedException, RequestTimeoutException {
        nettyPacket.setUsername(fsClientConfig.getUsername());
        //是否连接过了
        if (fsClientConfig.getUserToken() != null) {
            nettyPacket.setUserToken(fsClientConfig.getUserToken());
        }
        //同步发送请求
        NettyPacket resp = netClient.sendSync(nettyPacket);
        //认证失败
        if (resp.isError()) {
            throw new DfsClientException(resp.getError());
        }
        return resp;
    }

    /**
     * 验证文件名称合法,校验连接已经认证通过
     *
     * @param filename 文件名称
     */
    private void validate(String filename) throws Exception {
        boolean ret = StringUtils.validateFileName(filename);
        if (!ret) {
            throw new DfsClientException("不合法的文件名：" + filename);
        }
        //认证初始化状态进行等待
        while (authStatus == AUTH_INIT) {
            //同步锁住当前实例
            synchronized (this) {
                //释放锁 等待 10 milliseconds
                wait(10);
            }
        }
        //从阻塞中醒来之后 检测是否通过了认证
        if (authStatus != AUTH_SUCCESS) {
            throw new DfsClientException("未通过认证，不能发送请求");
        }
    }

    /**
     * 认证成功后，唤醒等待的线程
     */
    private void notifyAuthenticate() {
        //谁锁住了这个实例啊?
        synchronized (this) {
            notifyAll();
        }
    }
}
